package sample.cluster.client.grpc

import org.apache.pekko.NotUsed
import org.apache.pekko.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated }
import org.apache.pekko.event.LoggingAdapter
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl.Source

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future, Promise }

object ClusterClient {

  /**
   * Factory method for `ClusterClient` [[org.apache.pekko.actor.Props]].
   */
  def props(
      settings: ClusterClientSettings)(implicit materializer: Materializer): Props =
    Props(new ClusterClient(settings))

  sealed trait Command

  final case class Send(path: String, msg: Any, localAffinity: Boolean)
      extends Command {

    /**
     * Convenience constructor with `localAffinity` false
     */
    def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
  }

  /**
   * More efficient than `Send` for single request-reply interaction
   */
  final case class SendAsk(path: String, msg: Any, localAffinity: Boolean)
      extends Command {

    /**
     * Convenience constructor with `localAffinity` false
     */
    def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
  }

  final case class SendToAll(path: String, msg: Any) extends Command

  final case class Publish(topic: String, msg: Any) extends Command

  private def createClientStub(
      settings: ClusterClientSettings)(implicit sys: ActorSystem): ClusterClientReceptionistServiceClient = {
    ClusterClientReceptionistServiceClient(settings.grpcClientSettings)
  }

  private def newSession(
      settings: ClusterClientSettings,
      receptionistServiceClient: ClusterClientReceptionistServiceClient,
      sender: ActorRef,
      killSwitch: SharedKillSwitch,
      log: LoggingAdapter,
      serialization: ClusterClientSerialization)(implicit mat: Materializer): Future[ActorRef] = {
    val sessionReqRefPromise = Promise[ActorRef]()
    log.info("New session for {}", sender)
    receptionistServiceClient
      .newSession(
        Source
          .actorRef[Any](
            bufferSize = settings.bufferSize,
            overflowStrategy = OverflowStrategy.dropNew,
            // never complete from stream element
            completionMatcher = PartialFunction.empty,
            // never fail from stream element
            failureMatcher = PartialFunction.empty)
          // .actorRef[Any](bufferSize = settings.bufferSize, overflowStrategy = OverflowStrategy.dropNew)
          .via(killSwitch.flow)
          .map {
            case send: Send =>
              val payload = serialization.serializePayload(send.msg)
              Req().withSend(
                SendReq(send.path, send.localAffinity, Some(payload)))
            case sendToAll: SendToAll =>
              val payload = serialization.serializePayload(sendToAll.msg)
              Req().withSendToAll(SendToAllReq(sendToAll.path, Some(payload)))
            case publish: Publish =>
              val payload = serialization.serializePayload(publish.msg)
              Req().withPublish(PublishReq(publish.topic, Some(payload)))
          }
          .mapMaterializedValue(sessionReqRef => {
            sessionReqRefPromise.success(sessionReqRef)
            NotUsed
          }))
      .watch(sender) // end session when original sender terminates
      .recoverWithRetries(-1,
        {
          case _: WatchedActorTerminatedException => Source.empty
        })
      .map { rsp =>
        serialization.deserializePayload(rsp.payload.get)
      }
      .runForeach(sender ! _)
      .onComplete { result =>
        log.info("Session completed for {} with {}", sender, result)
      }(mat.executionContext)

    sessionReqRefPromise.future
  }

  private def askSend(
      receptionistServiceClient: ClusterClientReceptionistServiceClient,
      send: SendAsk,
      serialization: ClusterClientSerialization)(implicit ec: ExecutionContext): Future[Any] = {
    val payload = serialization.serializePayload(send.msg)
    val sendReq = SendReq(send.path, send.localAffinity, Some(payload))
    receptionistServiceClient.askSend(sendReq).map { rsp =>
      serialization.deserializePayload(rsp.payload.get)
    }
  }
}

/**
 * This actor is intended to be used on an external node that is not member
 * of the cluster. It acts like a gateway for sending messages to actors
 * somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish
 * a connection to a [[ClusterClientReceptionist]] somewhere in the cluster.
 *
 * You can send messages via the `ClusterClient` to any actor in the cluster
 * that is registered in the [[ClusterClientReceptionist]].
 * Messages are wrapped in [[ClusterClient#Send]], [[ClusterClient#SendToAll]]
 * or [[ClusterClient#Publish]].
 *
 * 1. [[ClusterClient#Send]] -
 * The message will be delivered to one recipient with a matching path, if any such
 * exists. If several entries match the path the message will be delivered
 * to one random destination. The sender of the message can specify that local
 * affinity is preferred, i.e. the message is sent to an actor in the same local actor
 * system as the used receptionist actor, if any such exists, otherwise random to any other
 * matching entry.
 *
 * 2. [[ClusterClient#SendToAll]] -
 * The message will be delivered to all recipients with a matching path.
 *
 * 3. [[ClusterClient#Publish]] -
 * The message will be delivered to all recipients Actors that have been registered as subscribers to
 * to the named topic.
 *
 * Use the factory method [[ClusterClient#props]]) to create the
 * [[org.apache.pekko.actor.Props]] for the actor.
 *
 * If the receptionist is not currently available, the client will buffer the messages
 * and then deliver them when the connection to the receptionist has been established.
 * The size of the buffer is configurable and it can be disabled by using a buffer size
 * of 0. When the buffer is full old messages will be dropped when new messages are sent
 * via the client.
 *
 * Note that this is a best effort implementation: messages can always be lost due to the distributed
 * nature of the actors involved.
 */
final class ClusterClient(settings: ClusterClientSettings)(
    implicit materializer: Materializer) extends Actor
    with ActorLogging {

  import ClusterClient._

  val serialization = new ClusterClientSerialization(context.system)

  private val receptionistServiceClient: ClusterClientReceptionistServiceClient =
    createClientStub(settings)(context.system)

  // Original sender -> stream Source.actorRef of the session
  private var sessionRef: Map[ActorRef, Future[ActorRef]] = Map.empty

  private val killSwitch = KillSwitches.shared(self.path.name)

  override def postStop(): Unit = {
    killSwitch.shutdown()
    super.postStop()
  }

  def receive: Receive = {
    case send: SendAsk =>
      import org.apache.pekko.pattern.pipe
      import context.dispatcher
      askSend(receptionistServiceClient, send, serialization).pipeTo(sender())

    case cmd: Command =>
      // Send or Publish
      val originalSender = sender()
      val session = sessionRef.get(originalSender) match {
        case Some(ses) => ses
        case None =>
          val ses = newSession(
            settings,
            receptionistServiceClient,
            originalSender,
            killSwitch,
            log,
            serialization)
          sessionRef = sessionRef.updated(originalSender, ses)
          ses
      }

      context.watch(originalSender)

      import context.dispatcher
      session.foreach(_ ! cmd)

    case Terminated(ref) =>
      sessionRef -= ref
  }

}

object ClusterClientSettings {

  /**
   * Create settings from the default configuration
   * `sample.cluster.client.grpc`.
   */
  def apply(system: ActorSystem): ClusterClientSettings = {
    val config = system.settings.config.getConfig("sample.cluster.client.grpc")
    val grpcClientSettings = GrpcClientSettings
      // FIXME service discovery
      .connectToServiceAt("127.0.0.1", 50051)(system)
      .withDeadline(3.second) // FIXME config
      .withTls(false)

    new ClusterClientSettings(
      bufferSize = config.getInt("buffer-size"),
      grpcClientSettings)
  }

}

final case class ClusterClientSettings(bufferSize: Int,
    grpcClientSettings: GrpcClientSettings)
